package com.rtsapp.server.domain.mysql.sql.impl;

import com.rtsapp.server.domain.mysql.sql.MySQLConnectionInfo;
import com.rtsapp.server.domain.mysql.sql.MySQLPreparedStatement;
import com.rtsapp.server.domain.mysql.sql.MySQLResultSetHandler;
import com.rtsapp.server.domain.mysql.sql.MySQLTransaction;

import com.rtsapp.server.profiling.Profilling;
import com.rtsapp.server.logger.Logger;

import java.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/***
 * 数据库连接:
 * <p>
 * 重点在于对失败的处理,和稳定性
 */
public class MySQLConnection {

    private static Logger logger =com.rtsapp.server.logger.LoggerFactory.getLogger(MySQLConnection.class);


    public static final int CONNECTION_SYNCH = 1; //同步
    public static final int CONNECTION_ASYNC = 2; //异步

    private static final String PING_INDEX = "ping_index";
    private static final String PING_SQL = "select 1";

    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }


    //连接
    private MySQLConnectionInfo connectionInfo = null; //连接信息
    private int connectionFlag; //连接标识
    private Connection conn = null; //连接
    private boolean reconnecting = false; //是否重连中
    private Map<String, String> prepareSqlMap = null;
    private Map<String, PreparedStatement> stmtMap = null;
    private boolean prepareError = false;
    //线程锁，用于连接在多个线程中
    private Lock mutex = null;
    //异步需要
    private BlockingQueue<SQLOperation> queue = null; //阻塞队列
    private DatabaseWorker worker = null; //工作线程

    public MySQLConnection(MySQLConnectionInfo connInfo) {
        this.connectionInfo = connInfo;
        this.connectionFlag = CONNECTION_SYNCH;
        this.conn = null;
        this.reconnecting = false;
        this.prepareError = false;
        this.stmtMap = new HashMap<String, PreparedStatement>();
        this.prepareSqlMap = new HashMap<>( );

        this.mutex = new ReentrantLock();

        this.queue = null;
        this.worker = null;
    }


    public MySQLConnection(BlockingQueue<SQLOperation> queue, MySQLConnectionInfo connInfo , int queueIndex ) {
        this.connectionInfo = connInfo;
        this.connectionFlag = CONNECTION_ASYNC;
        this.conn = null;
        this.reconnecting = false;
        this.prepareError = false;
        this.stmtMap = new HashMap<String, PreparedStatement>();
        this.prepareSqlMap = new HashMap<>( );

        this.mutex = new ReentrantLock();

        this.queue = queue;
        this.worker = new DatabaseWorker(this.queue, this, queueIndex);
    }


    public String getConnIpPortDatabase(){
        return this.connectionInfo.getHost() + ":" + this.connectionInfo.getPort() + ":" + this.connectionInfo.getDatabase();
    }

    public boolean open() {

        String url = this.connectionInfo.getJDBCURL();
        try {
            logger.debug( "尝试连接数据库, url=" + url + ", user=" + this.connectionInfo.getUser() + ",pwd=" + this.connectionInfo.getPassword() );

            this.conn = DriverManager.getConnection(url, this.connectionInfo.getUser(), this.connectionInfo.getPassword() );

            prepareStatement( PING_INDEX, PING_SQL );

            return true;
        } catch (SQLException e) {

            logger.error( "连接数据库失败,url=" + url + ", user=" + this.connectionInfo.getUser() + ",pwd=" + this.connectionInfo.getPassword(), e );
            return false;
        }
    }

    public void close() {

        try {
            if( this.conn != null && !this.conn.isClosed( ) ){
                this.conn.close();
                this.conn = null;
            }
        } catch (SQLException e) {
            logger.error("关闭连接失败", e );
        }
    }

    /**
     * 关闭数据库连接，并停止对应的数据库操作线程
     */
    public void stop(){

        this.close();

        if( this.worker != null ){
            this.worker.stop();
        }
    }



    protected boolean prepareStatement( String index, String sql ) {

        if( this.reconnecting ) {

            PreparedStatement stmt = this.getPreparedStatement(index);

            if (stmt != null) {
                this.stmtMap.remove(index);
                try {
                    if (!stmt.isClosed()) {
                        stmt.close();
                    }
                } catch (SQLException e) {
                   logger.error( "stmt关闭失败", e );
                }
            }
        }

        PreparedStatement oldStmt = this.getPreparedStatement(index);
        if( oldStmt != null ){
            logger.info("[预编译sql] PreparedStatement已经存在:index={}, sql={}", index , sql );
            return true;
        }

        try {

            PreparedStatement stmt = conn.prepareStatement(sql);

            this.stmtMap.put(index, stmt);
            this.prepareSqlMap.put(index, sql );
            Profilling.getInstance().getExecuteProfilling( Profilling.CategoryIds.SQL, index.hashCode() ).setExecuteName( sql );
            logger.debug( "[预编译sql] 成功 index={}, sql={}", index, sql );
            return true;
        } catch (SQLException e) {
            logger.error( "[预编译sql] 失败 index={}, sql={}", index, sql );
            return false;
        }

    }

    public PreparedStatement getPreparedStatement( String index ) {
        return this.stmtMap.get(index);
    }


    public boolean execute(MySQLPreparedStatement stmt) {

        if ( conn == null ) {
            return false;
        }

        long startTime = System.nanoTime();

        String index = stmt.getIndex();

        PreparedStatement preStmt = getPreparedStatement(index);
        try {
            stmt.bindParameters(preStmt);

            if( logger.isDebugEnabled() ) {
                logger.debug( "[SQL IN_UP_DEL] [index={}] [sql={}]", index, preStmt.toString());
            }

            preStmt.executeUpdate();

            clearPreparedStatementParameters(preStmt);

            //性能统计
            Profilling.getInstance().getExecuteProfilling( Profilling.CategoryIds.SQL, index.hashCode() ).logSuccess( System.nanoTime() - startTime );

            return true;

        }  catch (SQLException e) {
            logger.error("数据库操作错误execute, 尝试处理错误, Index为" + index, e);
            // 如果出错啦，需要再次执行
            if ( this.handleMySQLError( e ) ) {
                logger.error("错误判断为可恢复, 重新执行, Index为" + index );
                return execute(stmt);
            } else {
                logger.error("错误判断为不可恢复, 记录执行的sql, Index为" + index );
                logExecuteExceptionPrepredStatement(index, preStmt);
                clearPreparedStatementParameters(preStmt);

                //性能统计
                Profilling.getInstance().getExecuteProfilling( Profilling.CategoryIds.SQL, index.hashCode() ).logFail();

                return false;
            }
        }
    }


    public boolean query( MySQLPreparedStatement stmt, MySQLResultSetHandler handler) {

        if (conn == null) {
            return false;
        }

        long startTime = System.nanoTime();

        String index = stmt.getIndex();

        PreparedStatement preStmt = getPreparedStatement(index);
        try {
            stmt.bindParameters(preStmt);

            if( logger.isDebugEnabled() ) {
                logger.debug( "[SQL QUERY] [index={}] [sql={}]" , index,  preStmt.toString());
            }

            ResultSet rs = preStmt.executeQuery();

            handler.doResultSet(rs);

            clearPreparedStatementParameters(preStmt);

            rs.close();

            //性能统计
            Profilling.getInstance().getExecuteProfilling( Profilling.CategoryIds.SQL, index.hashCode() ).logSuccess(System.nanoTime() - startTime);

            return true;

        } catch (SQLException e) {

            logger.error("数据库操作错误query, 尝试处理错误, Index为" + index,  e );
            // 如果出错啦，需要再次执行
            if (this.handleMySQLError( e )) {
                logger.error("错误判断为可恢复, 重新执行, Index为" + index );
                return query(stmt, handler);
            } else {
                logger.error("错误判断为不可恢复, 记录执行的sql, Index为" + index );
                logExecuteExceptionPrepredStatement(index, preStmt);
                clearPreparedStatementParameters(preStmt);

                //性能统计
                Profilling.getInstance().getExecuteProfilling( Profilling.CategoryIds.SQL, index.hashCode() ).logFail();

                return false;
            }

        }

    }

    //记录未执行的PrepredStatement
    private void logExecuteExceptionPrepredStatement( String index, PreparedStatement stmt ){
        logger.error("记录执行失败的sql语句");
        logger.error( "Index=" + index );
        logger.error( "SQL=" + stmt.toString( )  );
    }


    private void clearPreparedStatementParameters(PreparedStatement stmt) {
        try {
            stmt.clearParameters();
        } catch (SQLException e ) {
            logger.error("clearParameters 错误", e);
        }
    }




    public boolean executeTransaction(MySQLTransaction trans) {

        beginTransaction();

        for ( MySQLPreparedStatement stmt : trans.getPreparedStatementList() ) {
            if ( !execute(stmt) ) {
                rollbackTransaction();
                return false;
            }
        }

        commitTransaction();
        return true;
    }



    public void beginTransaction( ) {
        try {
            this.conn.setAutoCommit(false);
        } catch (SQLException e) {

            logger.error( "setAutoCommit 错误", e );

            if (this.handleMySQLError( e )) {
                beginTransaction();
            }

        }
    }


    public void commitTransaction( ) {

        try {
            this.conn.commit();
            this.conn.setAutoCommit(true);
        } catch (SQLException e) {

            logger.error("conn.commit 错误", e);

            if( this.handleMySQLError( e ) ) {
                commitTransaction();
            }
        }
    }

    public void rollbackTransaction( ) {
        try {
            this.conn.rollback();
            this.conn.setAutoCommit(true);

        } catch (SQLException e) {

            logger.error("conn.rollback 错误", e);

            if( this.handleMySQLError( e ) ) {
                rollbackTransaction();
            }

        }

    }


    private boolean handleMySQLError( SQLException ex ) {

        String errNo = ex.getSQLState();
        int errCode = ex.getErrorCode();

        //连接不上一定有问题, 需要重连
        if ( ex instanceof com.mysql.jdbc.exceptions.jdbc4.CommunicationsException ||
                ex instanceof com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException ||
                ex instanceof com.mysql.jdbc.exceptions.jdbc4.MySQLTransientConnectionException  ) {

            logger.error( "连接异常:" , ex );

            reconnecting = true;
            int oldConnHashCode = conn.hashCode();
            try {
                conn.close();
            } catch (SQLException e) {
                logger.error( "conn.close() error", e );
            }

            while( !open() ){
                try {
                    Thread.sleep( 3000 );
                } catch (InterruptedException e) {
                   logger.error( "InterruptedException",  e );
                }
            }

            // Don't remove 'this' pointer unless you want to skip loading all prepared statements...
            if ( !rePrepareStatements( ) ) {
                logger.error("Could not re-prepare statements!");
                close( );
                return false;
            }

            logger.info("Worker to the MySQL server is active.");
            if (oldConnHashCode != conn.hashCode())
                logger.info(String.format("Successfully reconnected to %s @%s:%s (%s).",
                        connectionInfo.getDatabase(), connectionInfo.getHost(), connectionInfo.getPort(),
                        (connectionFlag == CONNECTION_ASYNC) ? "asynchronous" : "synchronous"));

            reconnecting = false;
            return true;

        } else if( ex instanceof  com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException ){

            logger.error( "sql 语法 错误", ex );

            return false;

        } else if ( ex instanceof com.mysql.jdbc.exceptions.jdbc4.MySQLDataException  ){

            logger.error( "sql 数据 错误", ex );

            return false;
        } else if ( ex instanceof com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException  ){
            logger.error( "违反 数据库完整性 约束", ex );
            return false;
        } else if ( ex instanceof com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException ){
            logger.error( "查询 被中断", ex );
            return false;
        } else if( ex instanceof com.mysql.jdbc.exceptions.jdbc4.MySQLTimeoutException ){
            logger.error( "数据库操作超时,将进行重试", ex );
            try {
                Thread.sleep( 10 );
            } catch (InterruptedException e) {
                logger.error( "超时休眠异常:", e );
            }
            return true;
        } else if( ex instanceof  java.sql.SQLTransientException ){

            logger.error( "其他瞬时异常,休眠10毫秒, 再进行重试", ex );

            try {
                Thread.sleep( 10 );
            } catch (InterruptedException e) {
                logger.error( "超时休眠异常:", e );
            }

            return true;
        }else{
            logger.error( "数据库发生不能处理的异常", ex );
            return false;
        }


    }

    private boolean rePrepareStatements(){
        for( Map.Entry<String,String> entry : this.prepareSqlMap.entrySet(  ) ){
            if( !prepareStatement( entry.getKey(), entry.getValue( ) ) ){
                return false;
            }
        }
        return true;
    }

    public void ping() {
        MySQLPreparedStatement psmt = new MySQLPreparedStatement( PING_INDEX );
        this.query(psmt, new MySQLResultSetHandler(){

            @Override
            public void doResultSet(ResultSet rs) {
                try {
                    if( rs.next() ){
                        int result =  rs.getInt( 1 );
                        logger.info( "ping 数据库成功" );
                    }else{
                        logger.info( "ping 数据库失败" );
                    }
                } catch (SQLException e) {
                    logger.error( "ping 数据库失败" , e );
                }

            }
        });
    }

    public boolean lockIfReady() {
        return mutex.tryLock();
    }

    public void unlock() {
        mutex.unlock();
    }

}
